/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nutch.protocol.http;

// JDK imports

import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.SpellCheckedMetadata;
import org.apache.nutch.net.protocols.HttpDateFormat;
import org.apache.nutch.net.protocols.Response;
import org.apache.nutch.protocol.ProtocolException;
import org.apache.nutch.protocol.http.api.HttpBase;
import org.apache.nutch.protocol.http.api.HttpException;
import org.apache.nutch.storage.WebPage;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/** An HTTP response. */
public class HttpResponse implements Response {

	private Configuration conf;
	private final HttpBase http;
	private final URL url;
	private byte[] content;
	private int code;
	private final Metadata headers = new SpellCheckedMetadata();

	protected enum Scheme {
		HTTP, HTTPS,
	}

	public HttpResponse(HttpBase http, URL url, WebPage page)
			throws ProtocolException, IOException {

		this.http = http;
		this.url = url;

		Scheme scheme = null;

		if ("http".equals(url.getProtocol())) {
			scheme = Scheme.HTTP;
		} else if ("https".equals(url.getProtocol())) {
			scheme = Scheme.HTTPS;
		} else {
			throw new HttpException(
					"Unknown scheme (not http/https) for url:" + url);
		}

		if (Http.LOG.isTraceEnabled()) {
			Http.LOG.trace("fetching " + url);
		}

		String path = "".equals(url.getFile()) ? "/" : url.getFile();

		// some servers will redirect a request with a host line like
		// "Host: <hostname>:80" to "http://<hpstname>/<orig_path>"- they
		// don't want the :80...

		String host = url.getHost();
		int port;
		String portString;
		if (url.getPort() == -1) {
			if (scheme == Scheme.HTTP) {
				port = 80;
			} else {
				port = 443;
			}
			portString = "";
		} else {
			port = url.getPort();
			portString = ":" + port;
		}
		Socket socket = null;

		try {
			socket = new Socket(); // create the socket
			socket.setSoTimeout(http.getTimeout());

			// connect
			String sockHost = http.useProxy() ? http.getProxyHost() : host;
			int sockPort = http.useProxy() ? http.getProxyPort() : port;
			InetSocketAddress sockAddr = new InetSocketAddress(sockHost,
					sockPort);
			socket.connect(sockAddr, http.getTimeout());

			if (scheme == Scheme.HTTPS) {
				SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory
						.getDefault();
				SSLSocket sslsocket = (SSLSocket) factory.createSocket(socket,
						sockHost, sockPort, true);
				sslsocket.setUseClientMode(true);

				// Get the protocols and ciphers supported by this JVM
				Set<String> protocols = new HashSet<String>(
						Arrays.asList(sslsocket.getSupportedProtocols()));
				Set<String> ciphers = new HashSet<String>(
						Arrays.asList(sslsocket.getSupportedCipherSuites()));

				// Intersect with preferred protocols and ciphers
				protocols.retainAll(http.getTlsPreferredProtocols());
				ciphers.retainAll(http.getTlsPreferredCipherSuites());

				sslsocket.setEnabledProtocols(
						protocols.toArray(new String[protocols.size()]));
				sslsocket.setEnabledCipherSuites(
						ciphers.toArray(new String[ciphers.size()]));

				sslsocket.startHandshake();
				socket = sslsocket;
			}

			conf = http.getConf();
			if (sockAddr != null
					&& conf.getBoolean("store.ip.address", false) == true) {
				String ipString = sockAddr.getAddress().getHostAddress(); // get
																			// the
																			// ip
																			// address
				page.getMetadata().put(new Utf8("_ip_"),
						ByteBuffer.wrap(ipString.getBytes()));
			}

			// make request
			OutputStream req = socket.getOutputStream();

			StringBuffer reqStr = new StringBuffer("GET ");
			if (http.useProxy()) {
				reqStr.append(
						url.getProtocol() + "://" + host + portString + path);
			} else {
				reqStr.append(path);
			}

			reqStr.append(" HTTP/1.0\r\n");

			reqStr.append("Host: ");
			reqStr.append(host);
			reqStr.append(portString);
			reqStr.append("\r\n");

			reqStr.append("Accept-Encoding: x-gzip, gzip\r\n");

			reqStr.append("Accept: ");
			reqStr.append(this.http.getAccept());
			reqStr.append("\r\n");

			String userAgent = http.getUserAgent();
			if ((userAgent == null) || (userAgent.length() == 0)) {
				if (Http.LOG.isErrorEnabled()) {
					Http.LOG.error("User-agent is not set!");
				}
			} else {
				reqStr.append("User-Agent: ");
				reqStr.append(userAgent);
				reqStr.append("\r\n");
			}

			// if (page.isReadable(WebPage.Field.MODIFIED_TIME.getIndex())) {
			reqStr.append("If-Modified-Since: "
					+ HttpDateFormat.toString(page.getModifiedTime()));
			reqStr.append("\r\n");
			// }
			reqStr.append("\r\n");

			byte[] reqBytes = reqStr.toString().getBytes();

			req.write(reqBytes);
			req.flush();

			PushbackInputStream in = // process response
					new PushbackInputStream(
							new BufferedInputStream(socket.getInputStream(),
									Http.BUFFER_SIZE),
							Http.BUFFER_SIZE);

			StringBuffer line = new StringBuffer();

			boolean haveSeenNonContinueStatus = false;
			while (!haveSeenNonContinueStatus) {
				// parse status code line
				this.code = parseStatusLine(in, line);
				// parse headers
				parseHeaders(in, line);
				haveSeenNonContinueStatus = code != 100; // 100 is "Continue"
			}

			String transferEncoding = getHeader(Response.TRANSFER_ENCODING);
			if (transferEncoding != null
					&& "chunked".equalsIgnoreCase(transferEncoding.trim())) {
				readChunkedContent(in, line);
			} else {
				readPlainContent(in);
			}

			String contentEncoding = getHeader(Response.CONTENT_ENCODING);
			if ("gzip".equals(contentEncoding)
					|| "x-gzip".equals(contentEncoding)) {
				content = http.processGzipEncoded(content, url);
			} else {
				if (Http.LOG.isTraceEnabled()) {
					Http.LOG.trace(
							"fetched " + content.length + " bytes from " + url);
				}
			}

			// add headers in metadata to row
			if (page.getHeaders() != null) {
				page.getHeaders().clear();
			}
			for (String key : headers.names()) {
				page.getHeaders().put(new Utf8(key),
						new Utf8(headers.get(key)));
			}

		} finally {
			if (socket != null)
				socket.close();
		}

	}

	/*
	 * ------------------------- * <implementation:Response> *
	 * -------------------------
	 */

	public URL getUrl() {
		return url;
	}

	public int getCode() {
		return code;
	}

	public String getHeader(String name) {
		return headers.get(name);
	}

	public Metadata getHeaders() {
		return headers;
	}

	public byte[] getContent() {
		return content;
	}

	/*
	 * ------------------------- * <implementation:Response> *
	 * -------------------------
	 */

	private void readPlainContent(InputStream in)
			throws HttpException, IOException {

		int contentLength = Integer.MAX_VALUE; // get content length
		String contentLengthString = headers.get(Response.CONTENT_LENGTH);
		if (contentLengthString != null) {
			contentLengthString = contentLengthString.trim();
			try {
				if (!contentLengthString.isEmpty())
					contentLength = Integer.parseInt(contentLengthString);
			} catch (NumberFormatException e) {
				throw new HttpException(
						"bad content length: " + contentLengthString);
			}
		}
		if (http.getMaxContent() >= 0 && contentLength > http.getMaxContent()) // limit
																				// download
																				// size
			contentLength = http.getMaxContent();

		ByteArrayOutputStream out = new ByteArrayOutputStream(Http.BUFFER_SIZE);
		byte[] bytes = new byte[Http.BUFFER_SIZE];
		int length = 0;
		// read content
		int i = in.read(bytes);
		while (i != -1) {
			out.write(bytes, 0, i);
			length += i;
			if (length >= contentLength) {
				break;
			}
			if ((length + Http.BUFFER_SIZE) > contentLength) {
				// reading next chunk may hit contentLength,
				// must limit number of bytes read
				i = in.read(bytes, 0, (contentLength - length));
			} else {
				i = in.read(bytes);
			}
		}
		content = out.toByteArray();
	}

	/**
	 * 
	 * @param in
	 * @param line
	 * @throws HttpException
	 * @throws IOException
	 */
	@SuppressWarnings("unused")
	private void readChunkedContent(PushbackInputStream in, StringBuffer line)
			throws HttpException, IOException {
		boolean doneChunks = false;
		int contentBytesRead = 0;
		byte[] bytes = new byte[Http.BUFFER_SIZE];
		ByteArrayOutputStream out = new ByteArrayOutputStream(Http.BUFFER_SIZE);

		while (!doneChunks) {
			if (Http.LOG.isTraceEnabled()) {
				Http.LOG.trace("Http: starting chunk");
			}

			readLine(in, line, false);

			String chunkLenStr;
			// if (LOG.isTraceEnabled()) { LOG.trace("chunk-header: '" + line +
			// "'");
			// }

			int pos = line.indexOf(";");
			if (pos < 0) {
				chunkLenStr = line.toString();
			} else {
				chunkLenStr = line.substring(0, pos);
				// if (LOG.isTraceEnabled()) { LOG.trace("got chunk-ext: " +
				// line.substring(pos+1)); }
			}
			chunkLenStr = chunkLenStr.trim();
			int chunkLen;
			try {
				chunkLen = Integer.parseInt(chunkLenStr, 16);
			} catch (NumberFormatException e) {
				throw new HttpException("bad chunk length: " + line.toString());
			}

			if (chunkLen == 0) {
				doneChunks = true;
				break;
			}

			if (http.getMaxContent() >= 0
					&& (contentBytesRead + chunkLen) > http.getMaxContent())
				chunkLen = http.getMaxContent() - contentBytesRead;

			// read one chunk
			int chunkBytesRead = 0;
			while (chunkBytesRead < chunkLen) {

				int toRead = (chunkLen - chunkBytesRead) < Http.BUFFER_SIZE
						? (chunkLen - chunkBytesRead)
						: Http.BUFFER_SIZE;
				int len = in.read(bytes, 0, toRead);

				if (len == -1)
					throw new HttpException("chunk eof after "
							+ contentBytesRead + " bytes in successful chunks"
							+ " and " + chunkBytesRead + " in current chunk");

				// DANGER!!! Will printed GZIPed stuff right to your
				// terminal!
				// if (LOG.isTraceEnabled()) { LOG.trace("read: " + new
				// String(bytes, 0,
				// len)); }

				out.write(bytes, 0, len);
				chunkBytesRead += len;
			}

			readLine(in, line, false);
		}

		if (!doneChunks) {
			if (contentBytesRead != http.getMaxContent())
				throw new HttpException(
						"chunk eof: !doneChunk && didn't max out");
			return;
		}

		content = out.toByteArray();
		parseHeaders(in, line);

	}

	private int parseStatusLine(PushbackInputStream in, StringBuffer line)
			throws IOException, HttpException {
		readLine(in, line, false);

		int codeStart = line.indexOf(" ");
		int codeEnd = line.indexOf(" ", codeStart + 1);

		// handle lines with no plaintext result code, ie:
		// "HTTP/1.1 200" vs "HTTP/1.1 200 OK"
		if (codeEnd == -1)
			codeEnd = line.length();

		int code;
		try {
			code = Integer.parseInt(line.substring(codeStart + 1, codeEnd));
		} catch (NumberFormatException e) {
			throw new HttpException(
					"bad status line '" + line + "': " + e.getMessage(), e);
		}

		return code;
	}

	private void processHeaderLine(StringBuffer line)
			throws IOException, HttpException {

		int colonIndex = line.indexOf(":"); // key is up to colon
		if (colonIndex == -1) {
			int i;
			for (i = 0; i < line.length(); i++)
				if (!Character.isWhitespace(line.charAt(i)))
					break;
			if (i == line.length())
				return;
			throw new HttpException("No colon in header:" + line);
		}
		String key = line.substring(0, colonIndex);

		int valueStart = colonIndex + 1; // skip whitespace
		while (valueStart < line.length()) {
			int c = line.charAt(valueStart);
			if (c != ' ' && c != '\t')
				break;
			valueStart++;
		}
		String value = line.substring(valueStart);
		headers.set(key, value);
	}

	// Adds headers to our headers Metadata
	private void parseHeaders(PushbackInputStream in, StringBuffer line)
			throws IOException, HttpException {

		while (readLine(in, line, true) != 0) {

			// handle HTTP responses with missing blank line after headers
			int pos;
			if (((pos = line.indexOf("<!DOCTYPE")) != -1)
					|| ((pos = line.indexOf("<HTML")) != -1)
					|| ((pos = line.indexOf("<html")) != -1)) {

				in.unread(line.substring(pos).getBytes("UTF-8"));
				line.setLength(pos);

				try {
					// TODO: (CM) We don't know the header names here
					// since we're just handling them generically. It would
					// be nice to provide some sort of mapping function here
					// for the returned header names to the standard metadata
					// names in the ParseData class
					processHeaderLine(line);
				} catch (Exception e) {
					// fixme:
					Http.LOG.error("Failed with the following exception: ", e);
				}
				return;
			}

			processHeaderLine(line);
		}
	}

	private static int readLine(PushbackInputStream in, StringBuffer line,
			boolean allowContinuedLine) throws IOException {
		line.setLength(0);
		for (int c = in.read(); c != -1; c = in.read()) {
			switch (c) {
			case '\r':
				if (peek(in) == '\n') {
					in.read();
				}
			case '\n':
				if (line.length() > 0) {
					// at EOL -- check for continued line if the current
					// (possibly continued) line wasn't blank
					if (allowContinuedLine)
						switch (peek(in)) {
						case ' ':
						case '\t': // line is continued
							in.read();
							continue;
						}
				}
				return line.length(); // else complete
			default:
				line.append((char) c);
			}
		}
		throw new EOFException();
	}

	private static int peek(PushbackInputStream in) throws IOException {
		int value = in.read();
		in.unread(value);
		return value;
	}

}
